BigQuery で Airflow DAG のオーケストレーションができるようになりました(Preview)

BigQuery で Airflow DAG のオーケストレーションができるようになりました(Preview)

Clock Icon2024.10.26

こんにちは!エノカワです。

BigQuery で Airflow DAG をオーケストレーションする機能プレビュー となりました。
この機能により、BigQuery で Airflow DAG を表示、トリガー、一時停止することが可能になりました。

You can now view, trigger, and pause Airflow DAGs in BigQuery.

今回は、この新機能を実際に試してみましたので、その内容をご紹介します。

環境作成

まずは、Cloud Composer 3 環境を作成します。

Google Cloud コンソールで Cloud Composer の ページに移動します。

[環境の作成] をクリックし、表示されるバージョンリストから [Composer 3] を選択します。

bigquery-orchestrate-dags_01

test-composer3という名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はデフォルトのままで作成します。

bigquery-orchestrate-dags_02

DAG を作成する

次に、Cloud Composer 環境で実行するDAGを作成します。

今回は、Google Cloud Storage (GCS) にあるファイルを BigQuery に取り込むための Airflow DAG を作成しました。

gcs_to_bigquery.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'

# DAGの基本設定
default_args = {
    'start_date': days_ago(1),
}

with DAG(
    dag_id='gcs_to_bigquery',
    default_args=default_args,
    schedule_interval=None,  # 手動で実行する
    catchup=False
) as dag:

    # GCS から BigQuery へのデータ取り込み
    gcs_to_bq = GCSToBigQueryOperator(
        task_id='load_gcs_to_bq',
        bucket=BUCKET_NAME,  # GCS バケット名
        source_objects=['sales.csv'],  # GCS 内のファイルパス
        destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}',  # BigQuery のデータセットとテーブル
        source_format='CSV',  # ソースファイル形式
        write_disposition='WRITE_TRUNCATE',  # 既存データの上書き
        skip_leading_rows=1  # CSV のヘッダー行をスキップ
    )

    gcs_to_bq

この DAG では、Google Cloud Storage に保存された CSV ファイル (sales.csv) を BigQuery のテーブル(sales_data)にインポートしています。

この DAG を Cloud Composer にデプロイし、Airflow UI で DAG gcs_to_bigquery が表示されていることを確認します。

bigquery-orchestrate-dags_04

DAG を表示する

BigQuery コンソールで、オーケストレーションページに移動し、DAG 表示を確認してみましょう。

先ほどデプロイした DAG gcs_to_bigquery だけではなく、liveness DAG airflow_monitoring も表示されていました。
BigQuery に関連する DAG だけではなく、Cloud Composer にデプロイされている DAG すべてが表示対象のようです。

bigquery-orchestrate-dags_05

列の表示オプション

列の表示オプションをクリックして、DAG実行の詳細を含む追加の列を表示することができます。
以下の項目が選択可能です。

  • 状態
  • 名前
  • 種類
  • ソース
  • 送信先
  • トリガー
  • 作成日時(UTC+9)
  • 最終更新(UTC+9)
  • 最後に失敗した実行(UTC+9)
  • オーナー
  • リージョン
  • 前回の実行(UTC+9)

bigquery-orchestrate-dags_06

フィルタ

検索窓からキーワードで DAG をフィルタリングすることが可能です。

bigquery-orchestrate-dags_07

また、画面左側のフィルタパネルの表示対象をフィルタリングすることが可能です。

Composer DAG 以外にも以下の種類が選択可能です。

  • ノートブック
  • データブック
  • データ準備(Data Preparation)
  • Dataform
  • ワークフロー

また、種類以外にも下記のフィルタ項目があります。

  • 州(※)
  • 前回の実行ステータス
  • 前回の実行時間
  • リージョン

※:State(状態)の誤訳と思われます

bigquery-orchestrate-dags_08

Cloud Composer コンソールの DAG 表示

ちなみに、Cloud Composer コンソールでも [DAG] タブから DAG 表示することが可能です。
BigQuery オーケストレーションの DAG 表示とは項目が異なっていました。

bigquery-orchestrate-dags_09

DAG を実行する

選択した DAG を含む行で、縦三点リーダーから [DAG をトリガー] をクリックすると、DAG を手動でトリガーすることができます。

bigquery-orchestrate-dags_10

DAG を手動でトリガーすると、DAG に指定されたスケジュールとは関係なく、DAG を 1 回実行します。

bigquery-orchestrate-dags_11

Airflow UI の DAG 詳細画面でトリガー実行されていることが確認できました。

bigquery-orchestrate-dags_12

DAG の詳細を表示する

DAG の名前をクリックすると、DAG の詳細を表示することができます。

bigquery-orchestrate-dags_13

過去の DAG 実行

[実行] タブでは、過去の DAG 実行が表示されます。
デフォルトで過去 10 日間の DAG 実行が表示されますが、[10日] のドロップダウン メニューで時間範囲を変更することが可能です。

bigquery-orchestrate-dags_14

DAG の視覚化

[図] タブでは、タスクの依存関係を含む DAG の視覚化が表示されます。
図上のタスクを選択することで、タスクの詳細を表示することが可能です。

bigquery-orchestrate-dags_15

DAG のソース コード

[コード] タブでは、DAG のソース コードが表示されます。

bigquery-orchestrate-dags_16

DAG の詳細

[詳細] タブでは、DAG の詳細情報が表示されます。

bigquery-orchestrate-dags_17

DAG を一時停止する

選択した DAG を含む行で、縦三点リーダーから [DAG の一時停止] をクリックすると、DAG を一時停止することができます。

bigquery-orchestrate-dags_18

bigquery-orchestrate-dags_19

一時停止した DAG を含む行で、縦三点リーダーから [DAG の一時停止を解除] をクリックすると、DAG をアクティブにすることができます。

bigquery-orchestrate-dags_20

bigquery-orchestrate-dags_21

まとめ

以上、BigQuery で Airflow DAG をオーケストレーションする機能のご紹介でした。

BigQuery 上で直接 Airflow DAG の表示や操作ができるので、Cloud Composer に切り替えて操作する手間が減り、データパイプラインの運用が効率的になると感じました。

現在はプレビュー版ですが、今後の正式リリースに向けて、オーケストレーションページでの機能がさらに充実することを期待しています!

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.